Big data means collecting, storing, and processing data at a large scale on multiple machines.
Apache Spark is an open source framework that combines an engine for distributing programs across clusters of machines with an elegant model for writing programs atop it.
Features:
One of the most important talents that one can develop as a data scientist is the ability to discover interesting and worthwhile problems in every phase of the data analytics lifecycle.
Learning how to work with Spark in the same language in which the underlying framework is written (Scala) has a number of advantages:
Spark programming starts with a data set or few, usually residing in some form of distributed, persistent storage like the Hadoop Distributed File System (HDFS). Writing a Spark program typically consists of a few related steps:
The general structure of the problem is something like this: we have a large collection of records from one or more source systems, and it is likely that some of the records refer to the same underlying entity, such as a customer, a patient, or the location of a business or an event. Identify the records that refers to the same entity.
Dataset: curl -o donation.zip http://bit.ly/1Aoywaq
spark-shell, which is a REPL (read-eval-print loop) for the Scala language that also has some Spark-specific extensions.
In [1]:
sc
Out[1]:
There are two ways to create an RDD in Spark:
In [2]:
val rdd = sc.parallelize(Array(1, 2, 2, 4), 4)
rdd
Out[2]:
In [3]:
val rdd2 = sc.textFile("file:///usr/local/spark/README.md")
rdd2
Out[3]:
If Spark is given a directory instead of an individual file, it will consider all of the files in that directory as part of the given RDD.
In [4]:
val rawblocks = sc.textFile("hdfs:///user/root/linkage")
rawblocks
Out[4]:
Whenever we create a new variable in Scala, we must preface the name of the variable with either val or var. Variables that are prefaced with val are immutable, and cannot be changed to refer to another value once they are assigned, whereas variables that are prefaced with var can be changed to refer to different objects of the same type.
In [5]:
rawblocks.first()
Out[5]:
Use collect() if the client node can handle all the data present in the cluster.
In [6]:
val head = rawblocks.take(5)
In [7]:
head.length
Out[7]:
The act of creating an RDD does not cause any distributed computation to take place on the cluster. Rather, RDDs define logical data sets that are intermediate steps in a computation. Distributed computation occurs upon invoking an action on an RDD.
In [8]:
head.foreach(println)
In [9]:
def isHeader(line: String): Boolean = line.contains("id_1")
In [10]:
head.filter(isHeader).foreach(println)
In [11]:
head.filterNot(isHeader).length
Out[11]:
In [12]:
head.filter(x => !isHeader(x)).length
Out[12]:
In [13]:
head.filter(!isHeader(_)).length
Out[13]:
In [14]:
val noheader = rawblocks.filter(x => !isHeader(x))
In [15]:
noheader.first()
Out[15]:
In [16]:
val line = head(4)
In [17]:
val pieces = line.split(',')
In [18]:
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val matched = pieces(11).toBoolean
val rawscores = pieces.slice(2, 11)
def toDouble(s: String): Double = if ("?".equals(s)) Double.NaN else s.toDouble
rawscores.map(toDouble)
Out[18]:
Implicits work like this: if you call a method on a Scala object, and the Scala compiler does not see a definition for that method in the class definition for that object, the compiler will try to convert your object to an instance of a class that does have that method defined.
In [19]:
def parse(line: String): Tuple4[Int, Int, Array[Double], Boolean] = {
val pieces = line.split(',')
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val scores = pieces.slice(2, 11).map(toDouble)
val matched = pieces(11).toBoolean
(id1, id2, scores, matched)
}
In [20]:
val tup = parse(line)
In [21]:
tup._1
Out[21]:
In [22]:
tup._2
Out[22]:
In [23]:
tup._3
Out[23]:
In [24]:
tup._4
Out[24]:
Using case classes, one can create a simple record type that would allow us to address our fields by name, instead of by position.
In [25]:
case class MatchData(id1: Int, id2: Int, scores: Array[Double], matched: Boolean)
In [26]:
def parse(line: String): MatchData = {
val pieces = line.split(',')
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val scores = pieces.slice(2, 11).map(toDouble)
val matched = pieces(11).toBoolean
MatchData(id1, id2, scores, matched)
}
In [27]:
val md = parse(line)
In [28]:
md.id1
Out[28]:
In [29]:
md.id2
Out[29]:
In [30]:
md.scores
Out[30]:
In [31]:
md.matched
Out[31]:
In [32]:
val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
In [33]:
val parsed = noheader.map(x => parse(x))
In [34]:
parsed.cache()
In [35]:
parsed.count()
Out[35]:
Although the contents of RDDs are transient by default, Spark provides a mechanism for persisting the data in an RDD. After the first time an action requires computing such an RDD’s contents, they are stored in memory or disk across the cluster. The next time an action depends on the RDD, it need not be recomputed from its dependencies. Its data is returned from the cached partitions directly
In [36]:
val grouped = mds.groupBy(md => md.matched)
In [37]:
grouped.mapValues(x => x.size).foreach(println)
In [38]:
val matchCounts = parsed.map(md => md.matched).countByValue()
In [39]:
matchCounts
Out[39]:
In [40]:
val matchCountsSeq = matchCounts.toSeq
In [41]:
matchCountsSeq.sortBy(_._1).foreach(println)
In [42]:
matchCountsSeq.sortBy(_._2).reverse.foreach(println)
In [43]:
import java.lang.Double.isNaN
val stats = (0 until 9) map(i => {
parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats()
})
In [44]:
stats
Out[44]:
Whenever we expect that some analysis task we need to perform will be useful again and again, it’s worth spending some time to develop our code in a way that makes it easy for other analysts to use the solution we come up in their own analyses. To do this, we can write Scala code in a separate file that we can then load into the Spark shell for testing and validation, and we can then share that file with others once we know that it works.
Note: Make classes as Serializable in order to use the instances of this class inside Spark RDDs, and job will fail if Spark cannot serialize the data contained inside an RDD.
Scala’s object keyword is used to declare a singleton that can provide helper methods for a class, analogous to the static method definitions on a Java class.
A good feature has two properties: it tends to have significantly different values for matches and nonmatches (so the difference between the means will be large) and it occurs often enough in the data that we can rely on it to be regularly available for any pair of records.
The output of a recommender is more intuitively understandable than other machine learning algorithms. A support vector machine classifier is a set of coefficients, and it’s hard even for practitioners to articulate what the numbers mean when they make predictions.
Audioscrobbler was the first music recommendation system for last.fm, one of the first Internet streaming radio sites, founded in 2002. Audioscrobbler provided an open API for “scrobbling,” or recording listeners’ plays of artists’ songs. last.fm used this information to build a powerful music recommender engine.
We need an algorithm that learns without access to user or artist attributes. These are typically called collaborative filtering algorithms.
Latent-Factor Models: They try to explain observed interactions between large numbers of users and products through a relatively small number of unobserved, underlying reasons.
Matrix Factorization Model: Mathematically, these algorithms treat the user and product data as if it were a large matrix A, where the entry at row i and column j exists if user i has played artist j. A is sparse: most entries of A are 0, because only a few of all possible user-artist combinations actually appear in the data. They factor A as the matrix product of two smaller matrices, X and Y. They are very skinny—both have many rows because A has many rows and columns, but both have just a few columns (k). The k columns correspond to the latent factors that are being used to explain the interaction data
One can use Alternating Least Squares (ALS) algorithm to compute X and Y.
In [1]:
val rawUserArtistData = sc.textFile("hdfs:///user/root/recommendation/user_artist_data.txt")
In [2]:
rawUserArtistData.first()
Out[2]:
In [3]:
rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
Out[3]:
In [4]:
rawUserArtistData.map(_.split(' ')(1).toDouble).stats()
Out[4]:
In [5]:
rawUserArtistData.map(_.split(' ')(2).toDouble).stats()
Out[5]:
In [6]:
val rawArtistData = sc.textFile("hdfs:///user/root/recommendation/artist_data.txt")
In [7]:
val artistByID = rawArtistData.flatMap { line =>
val (id, name) = line.span(_ != '\t')
if (name.isEmpty) {
None
} else {
try {
Some((id.toInt, name.trim))
} catch {
case e: NumberFormatException => None
}
}
}
In [8]:
artistByID.first()
Out[8]:
In [9]:
val rawArtistAlias = sc.textFile("hdfs:///user/root/recommendation/artist_alias.txt")
In [10]:
val artistAlias = rawArtistAlias.flatMap { line =>
val tokens = line.split('\t')
if (tokens(0).isEmpty) {
None
} else {
Some((tokens(0).toInt, tokens(1).toInt))
}
}.collectAsMap()
In [11]:
artistAlias
Out[11]:
In [12]:
artistByID.lookup(6803336)
Out[12]:
In [13]:
artistByID.lookup(1000010)
Out[13]:
In [14]:
import org.apache.spark.mllib.recommendation._
val bArtistAlias = sc.broadcast(artistAlias)
val trainData = rawUserArtistData.map { line =>
val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)
Rating(userID, finalArtistID, count)
}.cache()
In [15]:
trainData.first()
Out[15]:
When Spark runs a stage, it creates a binary representation of all the information needed to run tasks in that stage, called the closure of the function that needs to be executed. This closure includes all the data structures on the driver referenced in the function. Spark distributes it to every executor on the cluster.
Broadcast variables are useful in situations where many tasks need access to the same (immutable) data structure. They extend normal handling of task closures to enable:
In [ ]:
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
We should first see if the artist recommendations make any intuitive sense, by examining a user, his or her plays, and recommendations for that user.
In [16]:
val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).
filter { case Array(user, _, _) => user.toInt == 2093760}
In [18]:
val existingProducts = rawArtistsForUser.map { case Array(_, artist, _) => artist.toInt}.collect().toSet
In [20]:
artistByID.filter { case (id, name) => existingProducts.contains(id)}.values.collect().foreach(println)
In [ ]:
val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)
To make this meaningful, some of the artist play data can be set aside and hidden from the ALS model building process. Then, this held-out data can be interpreted as a collection of good recommendations for each user, but one that the recommender has not already been given. The recommender is asked to rank all items in the model, and the ranks of the held-out artists are examined. Ideally, the recommender places all of them at or near the top of the list.
We can then compute the recommender’s score by comparing all held-out artists’ ranks to the rest. (In practice, we compute this by examining only a sample of all such pairs, because a potentially huge number of such pairs may exist.) The fraction of pairs where the held-out artist is ranked higher is its score. 1.0 is perfect, 0.0 is the worst possible score, and 0.5 is the expected value achieved from randomly ranking artists.
This metric is directly related to an information retrieval concept, called the Receiver Operating Characteristic (ROC) curve. The metric in the preceding paragraph equals the area under this ROC curve, and is indeed known as AUC, for Area Under the Curve. AUC may be viewed as the probability that a randomly chosen good recommendation ranks above a randomly chosen bad recommendation.
Choosing good hyperparameter values is a common problem in machine learning. The most basic way to choose values is to simply try combinations of values and evaluate a metric for each of them, and choose the combination that produces the best value of the metric.
Supervised learning require a body of inputs and outputs to learn from. They need to be fed both questions and known answers.
Types:
Most algorithms you will likely encounter in analytics packages and libraries are classification or regression techniques, like support vector machines, logistic regression, naïve Bayes, neural networks, and deep learning.
Inputs of a machine learing algorithm are the features required by the model to predict the output. But, these features should be properly structured, before feeding into the model, and one of the way to achieve this is by creating feature vectors.
Number of dimenions in feature vectors can be greater, lesser or equal to the number of the features to be feed in the model.
There are two broad groups of features:
A learning algorithm needs to train on data in order to make predictions. It requires a large number of inputs, and known correct outputs, from historical data. Feature vectors provide an organized way to describe input to a learning algorithm. The output, or target, of the prediction can also be thought of as a feature. Therefore, it’s not uncommon to simply include the target as another feature in the feature vector.
Pros of using Decision Trees:
Decision trees generalize into a more powerful algorithm, called random decision forests.
Working of decision trees:
The series of yes/no decisions that lead to a prediction are what decision trees embody. Each decision leads to one of two results, which is either a prediction or another decision. It is natural to think of the process as a tree of decisions, where each internal node in the tree is a decision, and each leaf node is a final answer.
The data set records the types of forest covering parcels of land in Colorado, USA. Each example contains several features describing each parcel of land, like its elevation, slope, distance to water, shade, and soil type, along with the known forest type covering the land. The forest cover type is to be predicted from the rest of the features, of which there are 54 in total.
Covtype data set, available online as a compressed CSV-format data file, covtype.data.gz, and accompanying info file, covtype.info.
The Spark MLlib abstraction for a feature vector is known as a LabeledPoint, which consists of a Spark MLlib Vector of features, and a target value, here called the label. The target is a Double value, and Vector is essentially an abstraction on top of many Double values. This suggests that LabeledPoint is only for numeric features. It can be used with categorical features, with appropriate encoding.
One such encoding is one-hot or 1-of-n encoding, in which one categorical feature that takes on N distinct values becomes N numeric features, each taking on the value 0 or 1. Exactly one of the N values has value 1, and the others are 0.
Another possible encoding simply assigns a distinct numeric value to each possible value of the categorical feature.
Note: Be careful when encoding a categorical feature as a single numeric feature. The original categorical values have no ordering, but when encoded as a number, they appear to. Treating the encoded feature as numeric leads to meaningless results because the algorithm is effectively pretending that rainy is somehow greater than, and two times larger than, cloudy . It’s OK as long as the encoding’s numeric value is not used as a number.
In [1]:
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression._
val rawData = sc.textFile("hdfs:///user/root/covtype/covtype.data")
In [31]:
rawData.first()
Out[31]:
In [2]:
val data = rawData.map { line =>
val values = line.split(',').map(_.toDouble)
val featureVector = Vectors.dense(values.init)
val label = values.last - 1
LabeledPoint(label, featureVector)
}
In [33]:
data.first()
Out[33]:
In [3]:
val Array(trainData, cvData, testData) = data.randomSplit(Array(0.8, 0.1, 0.1))
In [4]:
trainData.cache()
cvData.cache()
testData.cache()
Out[4]:
In [5]:
import org.apache.spark.mllib.evaluation._
import org.apache.spark.mllib.tree._
import org.apache.spark.mllib.tree.model._
import org.apache.spark.rdd._
In [6]:
val model = DecisionTree.trainClassifier(trainData, 7, Map[Int,Int](), "gini", 4, 100)
In [38]:
def getMetrics(model: DecisionTreeModel, data: RDD[LabeledPoint]): MulticlassMetrics = {
val predictionsAndLabels = data.map(example =>
(model.predict(example.features), example.label)
)
new MulticlassMetrics(predictionsAndLabels)
}
In [39]:
val metrics = getMetrics(model, cvData)
In [40]:
metrics.confusionMatrix
Out[40]:
In [52]:
metrics.precision(0)
Out[52]:
In [53]:
metrics.recall(0)
Out[53]:
In [51]:
(0 until 7).map (
cat => (metrics.precision(cat), metrics.recall(cat))
).foreach(println)
The tree’s decisions are chosen are maximum depth, maximum bins, and impurity measure.
val evaluations = for (impurity <- Array("gini", "entropy"); depth <- Array(1, 20); bins <- Array(10, 300)) yield { val model = DecisionTree.trainClassifier(trainData, 7, MapInt, Int, impurity, depth, bins) val predictionsAndLabels = cvData.map(example => (model.predict(example.features), example.label)) val accuracy = new MulticlassMetrics(predictionsAndLabels).precision ((impurity, depth, bins), accuracy) }
evaluations.sortBy(_._2).reverse.foreach(println)
In [ ]:
val model = DecisionTree.trainClassifier(trainData.union(cvData), 7, Map[Int, Int](), "entropy", 20, 300)
In order to use categorical features directly inside the model, change Map[Int, Int]() to Map(10 -> 4, 11 -> 40); which states that feature 10 has 4 categories, and feature 11 has 40.
Decision trees use several heuristics to be smarter about which few rules to actually consider. The process of picking rules also involves some randomness; only a few features picked at random are looked at each time, and only values from a random subset of the training data. This trades a bit of accuracy for a lot of speed, but it also means that the decision tree algorithm won’t build the same tree every time.
It would be great to have not one tree, but many trees, each producing reasonable but different and independent estimations of the right target value. Their collective average prediction should fall close to the true answer, more than any individual tree’s does. It’s the randomness in the process of building that helps create this independence. This is the key to random decision forests.
In [ ]:
val forest = RandomForest.trainClassifier(trainData, 7, Map(10 -> 4, 11 -> 40), 20, "auto", "entropy", 30, 300)
Random decision forests are appealing in the context of big data because trees are supposed to be built independently, and big-data technologies like Spark and MapReduce inherently need data-parallel problems, where parts of the overall solution can be computed independently on parts of the data. The fact that trees can, and should, train on only a subset of features or input data makes it trivial to parallelize building of the trees.
In [8]:
val input = "2709,125,28,67,23,3224,253,207,61,6094,0,29"
val vector = Vectors.dense(input.split(',').map(_.toDouble))
In [ ]:
model.predict(vector)
In supervised learning, in order to predict unknown values for new data, we had to know that target value for many previously seen examples. However, there are problems in which the correct output is unknown for some or all examples. Fortunately, unsupervised learning techniques can help. These techniques do not learn to predict any target value, because none is available. They can, however, learn structure in data, and find groupings of similar inputs, or learn what types of input are likely to occur and what types are not.
The problem of anomaly detection is, as its name implies, that of finding unusual things. Anomaly detection is often used to find fraud, detect network attacks, or discover problems in servers or other sensor-equipped machinery. Unsupervised learning techniques are useful in these cases, because they can learn what input data normally looks like, and therefore detect when new data is unlike past data. Such new data is not necessarily attacks or fraud; it is simply unusual, and therefore, worth further investigation.
Clustering is the best-known type of unsupervised learning. Clustering algorithms try to find natural groupings in data. Data points that are like one another, but unlike others, are likely to represent a meaningful grouping, and so clustering algorithms try to put such data into the same cluster.
K-means clustering is maybe the most widely used clustering algorithm. It attempts to detect k clusters in a data set, where k is given by the data scientist. k is a hyperparameter of the model, and the right value will depend on the data set.
To start, the algorithm picks some data points as the initial cluster centroids. Then each data point is assigned to the nearest centroid. Then for each cluster, a new cluster centroid is computed as the mean of the data points just assigned to that cluster. This process is repeated.
Some attacks attempt to flood a computer with network traffic to crowd out legitimate traffic. But in other cases, attacks attempt to exploit flaws in networking software to gain unauthorized access to a computer. While it’s quite obvious when a computer is being bombarded with traffic, detecting an exploit can be like searching for a needle in an incredibly large haystack of network requests.
Some attacks attempt to flood a computer with network traffic to crowd out legitimate traffic. But in other cases, attacks attempt to exploit flaws in networking software to gain unauthorized access to a computer. While it’s quite obvious when a computer is being bombarded with traffic, detecting an exploit can be like searching for a needle in an incredibly large haystack of network requests.
Here, unsupervised learning techniques like K-means can be used to detect anomalous network connections. K-means can cluster connections based on statistics about each of them.
The KDD Cup was an annual data mining competition organized by a special interest group of the ACM. The data set is about 708 MB and contains about 4.9M connections. For each connection, the data set contains information like the number of bytes sent, login attempts, TCP errors, and so on. Each connection is one line of CSV-formatted data, containing 38 features
In [14]:
val rawData = sc.textFile("hdfs:///user/root/kddcup/kddcup.data")
In [15]:
rawData.first()
Out[15]:
In [19]:
rawData.map(_.split(',').last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println)
In [26]:
import org.apache.spark.mllib.linalg._
val labelsAndData = rawData.map { line =>
val buffer = line.split(',').toBuffer
buffer.remove(1, 3)
val label = buffer.remove(buffer.length-1)
val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
(label, vector)
}
In [27]:
val data = labelsAndData.values.cache()
In [28]:
data.first()
Out[28]:
In [30]:
import org.apache.spark.mllib.clustering._
val kmeans = new KMeans()
val model = kmeans.run(data)
In [31]:
model.clusterCenters.foreach(println)
In [32]:
val clusterLabelCount = labelsAndData.map { case(label, datum) =>
val cluster = model.predict(datum)
(cluster, label)
}.countByValue
In [33]:
clusterLabelCount.toSeq.sorted.foreach {
case ((cluster, label), count) =>
println(f"$cluster%1s$label%18s$count%8s")
}
In [34]:
def distance(a: Vector, b: Vector) =
math.sqrt(a.toArray.zip(b.toArray).
map(p => p._1 - p._2).map(d => d * d).sum)
In [35]:
def distToCentroid(datum: Vector, model: KMeansModel) = {
val cluster = model.predict(datum)
val centroid = model.clusterCenters(cluster)
distance(centroid, datum)
}
In [36]:
import org.apache.spark.rdd._
def clusteringScore(data: RDD[Vector], k: Int) = {
val kmeans = new KMeans()
kmeans.setK(k)
val model = kmeans.run(data)
data.map(datum => distToCentroid(datum, model)).mean()
}
In [ ]:
(5 to 40 by 5).map(k => (k, clusteringScore(data, k))).foreach(println)
We can improve this by running the clustering many times for a value of k, with a different random starting centroid set each time, and picking the best clustering. The algorithm exposes setRuns() to set the number of times the clustering is run for one k.
This can also be accomplished with libraries like rhdfs.
We can normalize each feature by converting it to a standard score. This means subtracting the mean of the feature’s values from each value, and dividing by the standard deviation.
The categorical features can translate into several binary indicator features using one-hot encoding, which can be viewed as numeric dimensions.
A good clustering would have clusters whose collections of labels are homogeneous and so have low entropy. A weighted average of entropy can therefore be used as a cluster score.
Anomaly detection amounts to measuring a new data point’s distance to its nearest centroid. If this distance exceeds some threshold, it is anomalous. This threshold might be chosen to be the distance of, say, the 100th-farthest data point from among known data.
Most of the work in data engineering consists of assembling data into some sort of queryable format.
The process of preparing data into a format that humans can interact with is not so much “assembly,” but rather “indexing” in the nice case or “coercion” when things get ugly. A standard search index permits fast queries for the set of documents that contains a given set of terms. Sometimes, however, we want to find documents that relate to the concepts surrounding a particular word whether or not the documents contain that exact string. Standard search indexes often fail to capture the latent structure in the text’s subject matter.
Latent Semantic Analysis (LSA) is a technique in natural language processing and information retrieval that seeks to better understand a corpus of documents and the relationships between the words in those documents. LSA discovers this lower-dimensional representation using a linear algebra technique called singular value decomposition (SVD).
SVD can be thought of as a more powerful version of the ALS factorization. It starts with a term-document matrix generated through counting word frequencies for each document.In this matrix, each document corresponds to a column, each term corresponds to a row, and each element represents the importance of a word to a document. SVD then factorizes this matrix into three matrices, one of which expresses concepts in regard to documents, one of which expresses concepts in regard to terms, and one of which contains the importance for each concept. The structure of these matrices is such that we can achieve a low-rank approximation of the original matrix by removing a set of their rows and columns corresponding to the least important concepts. That is, the matrices in this low-rank approximation can be multiplied to produce a matrix close to the original, with increasing loss of fidelity as each concept is removed.
Before performing any analysis, LSA requires transforming the raw text of the corpus into a term-document matrix. Loosely, the value at each position should correspond to the importance of the row’s term to the column’s document. A few weighting schemes have been proposed, but by far the most common is term frequency times inverse document frequency, commonly abbreviated as TF-IDF
In [40]:
def termDocWeight(termFrequencyInDoc: Int, totalTermsInDoc: Int, termFreqInCorpus: Int, totalDocs: Int): Double = {
val tf = termFrequencyInDoc.toDouble / totalTermsInDoc
val docFreq = totalDocs.toDouble / termFreqInCorpus
val idf = math.log(docFreq)
tf * idf
}
The model relies on a few assumptions. It treats each document as a “bag of words,” meaning that it pays no attention to the ordering of words, sentence structure, or negations. By representing each term once, the model has difficulty dealing with polysemy, the use of the same word for multiple meanings.
XmlInputFormat is derived from the Apache Mahout project, that can split up the enormous Wikipedia dump into documents. Turning the Wiki XML into the plain text of article contents is done with the Cloud9 APIs.
import com.cloudera.datascience.common.XmlInputFormat
import edu.umd.cloud9.collection.wikipedia.language._
import edu.umd.cloud9.collection.wikipedia._
With the plain text in hand, next we need to turn it into a bag of terms.
Stemming refers to heuristics-based techniques for chopping off characters at the ends of words, while lemmatization refers to more principled approaches. The Stanford Core NLP project provides an excellent lemmatizer with a Java API that Scala can take advantage of.
import edu.stanford.nlp.pipeline._
import edu.stanford.nlp.ling.CoreAnnotations._
Filtering out less frequent terms can both improve performance and remove noise. A reasonable choice is to leave out all but the top N most frequent words, where N is somewhere in the tens of thousands.
With the term-document matrix M in hand, the analysis can proceed to the factorization and dimensionality reduction. MLlib contains an implementation of the singular value decomposition (SVD) that can handle enormous matrices. The singular value decomposition takes an m × n matrix and returns three matrices that approximately equal it when multiplied together.
In the LSA case, m is the number of documents and n is the number of terms. The decomposition is parameterized with a number k, less than or equal to n, that indicates how many concepts to keep around. When k = n, the product of the factormatrices reconstitutes the original matrix exactly. When k < n, the multiplication results in a low-rank approximation of the original matrix. k is typically chosen to be much smaller than n. SVD ensures that the approximation will be the closest possible to the original matrix (as defined by the L2 Norm—that is, the sum of squares—of the difference), given the constraint that it needs to be expressible in only k concepts.
The V matrix represents concepts through the terms that are important to them. As discussed earlier, V contains a column for every concept and a row for every term. The value at each position can be interpreted as the relevance of that term to that concept.
We can achieve a relevance score between two terms by computing the cosine similarity between their two column vectors in the matrix. Cosine similarity measures the angle between two vectors. Vectors that point in the same direction in the high-dimensional document space are thought to be relevant to each other. It is computed as the dot product of the vectors divided by the product of their lengths. Cosine similarity sees wide use as a similarity metric between vectors of term and document weights in natural language and information retrieval applications.
The LSA representation also offers benefits from an efficiency standpoint. It packs the important information into a lower-dimensional representation that can be operated on instead of the original term-document matrix. Consider the task of finding the set of terms most relevant to a particular term. The naive approach requires computing the dot product between that term’s column vector and every other column vector in the term-document matrix. This involves a number of multiplications proportional to the number of terms times the number of documents. LSA can achieve the same by looking up its concept-space representation and mapping it back into term space, requiring a number of multiplications only proportional to the number of terms times k. The low-rank approximation encodes the relevant patterns in the data so the full corpus need not be queried.
LSA offers a more useful representation of the data. It offers this in a few ways:
However, we need not actually calculate the contents of this matrix to discover the cosine similarity. Some linear algebra manipulation reveals that the cosine similarity between two columns in the reconstructed matrix is exactly equal to the cosine similarity between the corresponding columns in S V T
The same goes for computing relevance scores between documents. To find the similarity between two documents, compute the cosine similarity between u1 T S and u2 T S, where ui is the row in U corresponding to term i. To find the similarity between a document and all other documents, compute normalized(U S) u t .
Computing a similarity between a term and every document is equivalent to U S vt. In the other direction, the similarity between a document and every term comes from u dT S V
Querying in this way is like adding a new document to the corpus with just a few terms, finding its representation as a new row of the low-rank term-document matrix approximation, and then discovering the cosine similarity between it and the other entries in this matrix.
Data scientists are interested in understanding relationships between entities, whether between neurons, individuals, or countries, and how these relationships affect the observed behavior of the entities.
The explosion of digital data over the past decade gave researchers access to vast quantities of information about these relationships, and required that they develop new skills in order to acquire and manage these data sets.
Network science applies tools from graph theory, the mathematical discipline that studies the properties of pairwise relationships (called edges) between a set of entities (called vertices).
MEDLINE (Medical Literature Analysis and Retrieval System Online) is a database of academic papers that have been published in journals covering the life sciences and medicine. It is managed and released by the United States National Library of Medicine (NLM), a division of the National Institute of Health (NIH). Its citation index, which tracks the publication of articles across thousands of journals, can trace its history back to 1879, and it has been available online to medical schools since 1971 and to the general public via the World Wide Web since 1996. The main database contains more than 20 million articles going back to the early 1950s and is updated five days a week.
Due to the volume of citations and the frequency of updates, the research community developed an extensive set of semantic tags called MeSH (Medical Subject Headings) that are applied to all of the citations in the index. These tags provide a meaningful framework that can be used to explore relationships between documents to facilitate literature reviews.
wget ftp://ftp.nlm.nih.gov/nlmdata/sample/medline/*.gz
Scala comes with an excellent library called scala-xml for parsing and querying XML documents.
In [50]:
import scala.xml._
val cit = <MeshHeading><DescriptorName MajorTopicYN="N">Behavior</DescriptorName></MeshHeading>
In [54]:
cit \\ "MeshHeading"
Out[54]:
In [57]:
cit \ "DescriptorName"
Out[57]:
Each entry in the medline data set is a list of strings that are the names of topics that are mentioned in each citation record. To get the co-occurrences, we need to generate all of the two element subsets of this list of strings.
Scala’s collections library has a built-in method called combinations to make generating these sublists extremely easy.
When we’re studying co-occurrence networks, our standard tools for summarizing data don’t provide us much insight. The overall summary statistics we can calculate, like raw counts, don’t give us a feel for the overall structure of the relationships in the network, and the co-occurrence pairs that we can see at the extremes of the distribution are usually the ones that we care about least.
What we really want to do is analyze the co-occurrence network as a network: by thinking of the topics as vertices in a graph, and the existence of a citation record that features both topics as an edge between those two vertices. Then, we could compute network-centric statistics that would help us understand the overall structure of the network and identify interesting local outlier vertices that are worthy of further investigation.
GraphX is a Spark library that is designed to help us analyze various kinds of networks using the language and tools of graph theory.
GraphX is based on two specialized RDD implementations that are optimized for graphs. VertexRDD[VD] and EdgeRDD[ED].
The VertexRDD[VD] is a specialized implementation of RDD[(VertexId, VD)] , where the VertexID type is an instance of Long and is required for every vertex, while the VD can be any other type of data that is associated with the vertex, and is called the vertex attribute. The EdgeRDD[ED] is a specialized implementation of RDD[Edge[ED]] , where Edge is a case class that contains two VertexId values and an edge attribute of type ED .
Creating unique 64-bit value that can be associated with each topic string:
One option we could use would be to use the built-in hashCode method that will generate a 32-bit integer for any given Scala object. For our problem, which only has 13,000 vertices in the graph, the hash code trick will probably work. But for graphs that have millions or tens of millions of vertices, the probability of a hash code collision might be unacceptably high. For this reason, we’re going to use the Hashing library from Google’s Guava Library to create a unique 64-bit identifier for each topic using the MD5 hashing algorithm.
import com.google.common.hash.Hashing
A good habit to get into when you are generating edges is to ensure that the left side VertexId (which GraphX refers to as the src ) is less than the right side VertexId (which GraphX refers to as the dst ).
The Graph class provides built-in methods for calculating a number of these statistics, and in combination with the regular Spark RDD APIs, makes it easy for us to quickly get a feel for the structure of a graph to guide our exploration.
In the current co-occurrence graph, the edges are weighted based on the count of how often a pair of concepts appears in the same paper. The problem with this simple weighting scheme is that it doesn’t distinguish concept pairs that occur together because they have a meaningful semantic relationship from concept pairs that occur together because they happen to both occur frequently for any type of document.
We will use Pearson’s chi-squared test to calculate this “interestingness” in a principled way—that is, to test whether the occurrence of a particular concept is independent from the occurrence of another concept.
These are network which exhibits "small-world" properties:
Unfortunately, finding cliques in a given graph turns out to be NP-complete. One of these metrics is the triangle count at a vertex. A triangle is a complete graph on three vertices, and the triangle count at a vertex V is simply the number of triangles that contain V. The triangle count is a measure of how many neighbors of V are also connected to each other. Watts and Strogatz defined a new metric, called the local clustering coefficient, that is the ratio of the actual triangle count at a vertex to the number of possible triangles at that vertex based on how many neighbors it has.
Computing the path length between vertices in a graph is an iterative process that is similar to the iterative process we use to find the connected components. At each phase of the process, each vertex will maintain a collection of the vertices that it knows about and how far away each vertex is. Each vertex will then query its neighbors about the contents of their lists, and it will update its own list with any new vertices that are contained in its neighbors’ lists that were not contained in its own list. This process of querying neighbors and updating lists will continue across the entire graph until none of the vertices are able to add any new information to their lists.
This iterative, vertex-centric method of parallel programming on large, distributed graphs is based on a paper that Google published in 2009 called “Pregel: a system for large-scale graph processing”. Pregel is based on a model of distributed computation that predates MapReduce called “bulk-synchronous parallel,” or BSP. BSP programs divide parallel processing stages into two phases: computation and communication. During the computation phase, each vertex in the graph examines its own internal state and decides to send zero or more messages to other vertices in the graph. During the communication phase, the Pregel framework handles routing the messages that resulted from the previous communication phase to the appropriate vertices, which then process those messages, update their internal state, and potentially generate new messages during the next computation phase. The sequence of computation and communication steps continues until all of the vertices in the graph vote to halt, at which point the computation is finished.
To carry out this analysis, we need to deal with two types that data that come up all the time: temporal data, such as dates and times, and geospatial information, like points of longitude and latitude and spatial boundaries.
One of the great features of the Java platform is the sheer volume of code that has been developed for it over the years: for any kind of data type or algorithm you might need to use, it’s likely that someone else has written a Java library that you can use to solve your problem, and there’s also a good chance that an open source version of that library exists that you can download and use without having to purchase a license.
JodaTime has been the Java library of choice for working with temporal data. There is a wrapper library named NScalaTime that provides some additional syntactic sugar for working with JodaTime from Scala.
import com.github.nscalatime.time.Imports.
For data analysis problems, we usually need to convert some string representation of a date into a DateTime object on which we can do calculations. A simple way to accomplish this is with Java’s SimpleDateFormat , which is useful for parsing dates in different formats.
JodaTime handles all of the tedious details of different time zones and quirks of the calendar like Daylight Saving Time when it performs these duration calculations so that you don’t have to worry about them.
Note that from Java SE 8 onwards, users are asked to migrate to java.time (JSR-310) - a core part of the JDK which replaces this project.
There are two major kinds, vector and raster, and there are different tools for working with the different kinds of data. In our case, we have latitude and longitude for our taxi trip records, and vector data stored in the GeoJSON format that represents the boundaries of the different boroughs of New York. So we need a library that can parse GeoJSON data and can handle spatial relationships, like detecting whether a given longitude/latitude pair is contained inside of a polygon that represents the boundaries of a particular borough.
The Esri API provides a convenience class called GeometryEngine that contains static methods for performing all of the spatial relationship operations, including a con tains operation. The contains method takes three arguments: two Geometry objects, and one instance of the SpatialReference class, which represents the coordinate system used to perform the geospatial calculations.
The data we’ll use for the boundaries of boroughs in New York City comes written in a format called GeoJSON. The core object in GeoJSON is called a feature, which is made up of a geometry instance and a set of key-value pairs called properties. A geometry is a shape like a point, line, or polygon. A set of features is called a FeatureCollection.
GeoJSON data for the NYC borough maps:
The Esri Geometry API will help us parse the geometry JSON inside of each feature, but won’t help us with parsing the id or the properties fields, which can be arbitrary JSON objects. To parse these objects, we’re going to need to use a Scala JSON library, of which there are many that we can choose from. Spray, an open source toolkit for building web services with Scala, provides a JSON library that is up to the task. spray-json allows us to convert any Scala object to a corresponding JsValue by calling an implicit toJson method, and it also allows us to convert any String that contains JSON to a parsed intermediate form by calling parseJson , and then convert it to a Scala type T by calling convertTo[T] on the intermediate type.
With the GeoJSON and JodaTime libraries in hand, it’s time to begin analyzing the NYC taxi trip data.
Add a try-catch block to their parsing code so that any invalid records can be written out to the logs without causing the entire job to fail. With Spark, we can adapt our parsing code so that we can interactively analyze the invalid records in our data just as easily as we would perform any other kind of analysis.
In [58]:
def safe[S, T](f: S => T): S => Either[T, (S, Exception)] = {
new Function[S, Either[T, (S, Exception)]] with Serializable {
def apply(s: S): Either[T, (S, Exception)] = {
try {
Left(f(s))
} catch {
case e: Exception => Right((s, e))
}
}
}
}
We can now create a safe wrapper function called safeParse by passing our parse function (of type String => Trip ) to the safe function, and then applying safeParse to the taxiRaw RDD.
Given the temporal nature of our trip data, one reasonable invariant that we can expect is that the dropoff time for any trip will be sometime after the pickup time.
Let’s start examining the geospatial aspects of the taxi data. For each trip, we have a longitude/latitude pair representing where the passenger(s) were picked up and another one for where they were dropped off. We would like to be able to determine which borough each of these longitude/latitude pairs belongs to, and identify any trips that did not start or end in any of the five boroughs.
The kind of analysis in which we want to analyze a single entity as it executes a series of events over time, is called sessionization.
The naive way to create sessions in Spark is to perform a groupBy on the identifier we want to create sessions for and then sort the events post-shuffle by a timestamp identifier.
In MapReduce, we can build sessions by performing a secondary sort, where we create a composite key made up of an identifier and a timestamp value, sort all of the records on the composite key, and then use a custom partitioner and grouping function to ensure that all of the records for the same identifier appear in the same output partition.
Executing a sessionization pipeline is an expensive operation, and the sessionized data is often useful for many different analysis tasks that we might want to perform.
Many of the most sophisticated approaches to estimating this statistic rely on computationally intensive simulation of markets under random conditions. The technique behind these approaches, called Monte Carlo simulation, involves posing thousands or millions of random market scenarios and observing how they tend to affect a portfolio. Spark is an ideal tool for Monte Carlo simulation, because the technique is naturally massively parallelizable. Spark can leverage thousands of cores to run random trials and aggregate their results. As a general-purpose data transformation engine, it is also adept at performing the pre- and post-processing steps that surround the simulations. It can transform the raw financial data into the model parameters needed to carry out the simulations, as well as support ad-hoc analysis of the results.
VaR is a simple measure of investment risk that tries to provide a reasonable estimate of the maximum probable loss in value of an investment portfolio over the particular time period. A VaR statistic depends on three parameters: a portfolio, a time period, and a p-value. A VaR of 1 million dollars with a 5% p-value and two weeks indicates the belief that the portfolio stands only a 5% chance of losing more than 1 million dollars over two weeks.
Estimating this statistic requires proposing a model for how a portfolio functions and choosing the probability distribution its returns are likely to take.
Its model assumes that the return of each instrument is normally distributed, which allows deriving a estimate analytically.
Historical Simulation extrapolates risk from historical data by using its distribution directly instead of relying on summary statistics. A drawback of this method is that historical data can be limited and fails to include “what-ifs.” The history we have for the instruments in our portfolio may lack market collapses, but we might wish to model what happens to our portfolio in these situations.
Monte Carlo Simulation tries weakening the assumptions in the previous methods by simulating the portfolio under random conditions. When we can’t derive a closed form for a probability distribution analytically, we can often estimate its density function (PDF) by repeatedly sampling simpler random variables that it depends on and seeing how it plays out in aggregate.
In its most general form, this method:
A Monte Carlo risk model typically phrases each instrument’s return in terms of a set of market factors. We then need a model that predicts the return of each instrument based on these market conditions. In our simulation, we’ll use a simple linear model.
We’ll derive a set of features from simple transformations of the factor returns. That is, the market factor vector m t for a trial t is transformed by some function φ to produce a feature vector of possible different length ft:
ft = φ(mt)
For each instrument, we’ll train a model that assigns a weight to each feature. To calculate r it , the return of instrument i in trial t, we use c i , the intercept term for the instrument; w ij , the regression weight for feature j on instrument i; and f tj , the randomly generated value of feature j in trial t:
r it = c i + ∑ w i j * f t j
This means that the return of each instrument is calculated as the sum of the returns of the market factor features multiplied by their weights for that instrument. We can fit the linear model for each instrument using historical data.
Now that we have our model for calculating instrument losses from market factors, we need a process for simulating the behavior of market factors. A simple assumption is that each market factor return follows a normal distribution.
Yahoo! has a variety of stock data available for download in CSV format. The script download-all-symbols.sh located in the risk/data directory of the repo, will make a series of REST calls to download histories for all the stocks included in the NASDAQ index.
We also need historical data for our risk factors. For our factors, we’ll use the values of the S&P 500 and NASDAQ indexes, as well as the prices of 30-year treasury bonds and crude oil.
download-symbol.sh SNP factors
download-symbol.sh NDX factors
From each source, for each instrument and factor, we want to derive a list of (date, closing price) tuples.
Different types of instruments may trade on different days, or the data may have missing values for other reasons, so it is important to make sure that our different histories align. First, we need to trim all of our time series to the same region in time. Then, we need to fill in missing values. To deal with time series that are missing values at the start and end dates in the time region, we simply fill in those dates with nearby values in the time region.
To deal with missing values within a time series, we use a simple imputation strategy that fills in an instrument’s price as its most recent closing price before that day.
With these return histories in hand, we can turn to our goal of training predictive models for the instrument returns. For each instrument, we want a model that predicts its two-week return based on the returns of the factors over the same time period. For simplicity, we will use a linear regression model.
We now need a procedure for simulating market conditions by generating random factor returns. That is, we need to decide on a probability distribution over factor return vectors and sample from it. A nice way to visualize a probability distribution over continuous data is a density plot that plots the distribution’s domain versus its PDF. Because we don’t know the distribution that governs the data, we don’t have an equation that can give us its density at an arbitrary point, but we can approximate it through a technique called kernel density estimation.
breeze-viz is a Scala library that makes it easy to draw simple plots.
The multivariate normal distribution can help here by taking the correlation information between the factors into account. Each sample from a multivariate normal is a vector. Given values for all of the dimensions but one, the distribution of values along that dimension is normal. But, in their joint distribution, the variables are not independent.
In each trial, we want to sample a set of risk factors, use them to predict the return of each instrument, and sum all those returns to find the full trial loss. To achieve a representative distribution, we want to run thousands or millions of these trials.
We have a few choices in how to parallelize the simulation. We can parallelize along trials, instruments, or both. To parallelize along both, we would create an RDD of instruments and an RDD of trial parameters, and then use the cartesian transformation to generate an RDD of all the pairs.
In addition to calculating VaR at a particular confidence level, it can be useful to look at a fuller picture of the distribution of returns. Are they normally distributed? Do they spike at the extremities? As we did for the individual factors, we can plot an estimate of the probability density function for the joint probability distribution using kernel density estimation.
How do we know whether our estimate is a good estimate? How do we know whether we should simulate with a larger number of trials? In general, the error in a Monte Carlo simulation should be proportional to 1/ n . This means that, in general, quadrupling the number of trials should approximately cut the error in half.
A nice way to get a confidence interval on our VaR statistic is through bootstrapping. We achieve a bootstrap distribution over the VaR by repeatedly sampling with replacement from the set of portfolio returns that are the results of our trials. Each time, we take a number of samples equal to the full size of the trials set and compute a VaR from those samples. The set of VaRs computed from all the times form an empirical distribution, and we can get our confidence interval by simply looking at its quantiles.
The advent of next-generation DNA sequencing (NGS) technology is rapidly transforming the life sciences into a data-driven field. However, making the best use of this data is butting up against a traditional computational ecosystem that builds on difficult-to-use, low-level primitives for distributed computing (e.g., DRMAA or MPI) and a jungle of semi-structured text-based file formats.
Bioinformaticians spend a disproportionate amount of time worrying about file formats. On top of that, many of the format specifications are incomplete or ambiguous (which makes it hard to ensure implementations are consistent or compliant) and specify ASCII-encoded data. ASCII data is very common in bioinformatics, but it is inefficient and compresses relatively poorly.
We can solve all of these problems in one shot using a serialization framework like Apache Avro. The key lies in Avro’s separation of the data model (i.e., an explicit schema) from the underlying storage file format and also the language’s in-memory representation. Avro specifies how data of a certain type should be communicated between processes, whether that’s between running processes over the Internet, or a process trying to write the data into a particular file format.
The Avro project includes modules for processing Avro-encoded data from many languages, including Java, C/C++, Python, and Perl; after that, the language is free to store the object in memory in whichever way is deemed most advantageous. The separation of data modeling from the storage format provides another level of flexibility/abstraction; Avro data can be stored as Avro-serialized binary objects (Avro container file), in a columnar file format for fast queries (Parquet file), or as text JSON data for maximum flexibility (minimum efficiency). Finally, Avro supports schema evolvability, allowing the user to add new fields as they become necessary, while all the software gracefully deals with new/old versions of the schema.
Overall, Avro is an efficient binary encoding that allows you to easily specify evolvable data schemas, process the same data from many programming languages, and store the data using many formats. Deciding to store your data using Avro schemas frees you from perpetually working with more and more custom data formats, while simultaneously increasing the performance of your computations.
BDG’s core set of genomics tools is called ADAM. Starting from a set of mapped reads, this core includes tools that can perform mark-duplicates, base quality score recalibration, indel realignment, and variant calling, among other tasks. ADAM also contains a command-line interface that wraps the core for ease of use. In contrast to HPC, these command-line tools know about Hadoop and HDFS, and many of them can automatically parallelize across a cluster without having to split files or schedule jobs manually.
Parquet is an open source file format specification and a set of reader/writer implementations that we recommend for general use for data that will be used in analytical queries (write once, read many times). It has a data model that iscompatible with Avro, Thrift, and Protocol Buffers. Specifically, it supports most of the common database types (int, double, string, etc.), along with arrays and records, including nested types. Significantly, it is a columnar file format, meaning that values for a particular column from many records are stored contiguously on disk. This physical data layout allows for far more efficient data encoding/compression, and significantly reduces query times by minimizing the amount of data that must be read/deserialized. Parquet supports specifying different encoding/compression schemes for each column, and for each column supports run-length encoding, dictionary encoding, and delta encoding.
PySpark API is used for interacting with Spark through Python, and Thunder project is developed on top of PySpark for processing large amounts of time series data in general, and neuroimaging data in particular. PySpark is a particularly flexible tool for exploratory big data analysis, because it integrates well with the rest of the PyData ecosystem, including matplotlib for visualization, and even IPython Notebook (Jupyter) for “executable documents.”
Python is a favorite tool for many data scientists, due to its high-level syntax and extensive library of packages, among other things. We can submit Python scripts using spark-submit , which will detect the .py extension on our scripts. PySpark supports the use of the IPython shell by setting the environment variable IPYTHON=1 , which is something we recommend universally. When the Python shell starts, it creates a Python SparkContext object through which we interact with the cluster. Once the SparkContext is available, the PySpark API is very similar to the Scala API.
The only restrictions are that the Python function objects must be serializable with cloudpickle (which includes anonymous lambda functions), and any necessary modules referenced in the closures must be available on the PYTHONPATH of the executor Python processes. To ensure the availability of referenced modules, either the modules must be installed cluster-wide and available on the PYTHONPATH of the executor Python processes, or the corresponding module ZIP/EGG files must be explicitly distributed around by Spark, which will then add them to the PYTHONPATH . This latter functionality can be accomplished by a call to sc.addPyFile().
The PySpark API can lag behind the Scala API to a certain extent, so in some cases, features become available in Scala more rapidly.
Python has become a favorite tool for scientific computing and data science. It is now being used for many applications that would have traditionally used MATLAB, R, or Mathematica. The reasons include the following:
Some libraries to keep in mind in particular include:
When PySpark’s Python interpreter starts, it also starts a JVM with which it communicates through a socket. PySpark uses the Py4J project to handle this communication. The JVM functions as the actual Spark driver, and loads a JavaSparkContext that communicates with the Spark executors across the cluster. Python API calls to the SparkContext object are then translated into Java API calls to the JavaSparkContext.
The Spark executors on the cluster start a Python interpreter for each core, with which they communicate data through a pipe when they need to execute user code. A Python RDD in the local PySpark client corresponds to a PythonRDD object in the local JVM. The data associated with the RDD actually lives in the Spark JVMs as Java objects.
When an API call is made on the Python RDD, any associated code (e.g., Python lambda function) is serialized via cloudpickle and distributed to the executors. The data is then converted from Java objects to a Python-compatible representation (e.g., pickle objects) and streamed to executor-associated Python interpreters through a pipe. Any necessary Python processing is executed in the interpreter, and the resulting data is stored back as an RDD (as pickle objects by default) in the JVMs.
Thunder is a Python tool set for processing large amounts of spatial/temporal data sets (i.e., large multidimensional matrices) on Spark. It makes heavy use of NumPy for matrix computations and also the MLlib library for distributed implementations of some statistical techniques.
Thunder requires Spark, as well as the Python libraries NumPy, SciPy, matplotlib, and scikit-learn.
thunder command is basically wrapping the PySpark shell. Similarly to PySpark, the start of most computations is the ThunderContext variable tsc , which wraps the Python SparkContext with Thunder-specific functionality.
Thunder was designed especially with neuroimaging data sets in mind. Therefore, it is geared toward analyzing data from large sets of images that are often captured over time.
Zebrafish dataset: The zebrafish is a commonly used model organism in biology research. It is small, reproduces quickly, and is used as a model for vertebrate development. It’s also interesting because it has exceptionally fast regenerative capabilities. In the context of neuroscience, the zebrafish makes a great model because it is transparent and the brain is small enough that it is essentially possible to image it entirely at a high-enough resolution to distinguish individual neurons.
While analyzing the collection of images may be useful for certain operations (e.g., normalizing images in certain ways), it’s difficult to take the temporal relationship of the images into account. To do so, we’d rather work with the image data as a collection of pixel/voxel time series. This is exactly what the Thunder Series object is for.
More generally, the two core data types in Thunder, Series and Images , both inherit from the Data class, which wraps a Python RDD object and exposes part of the RDD API. The Data class models RDDs of key-value pairs, where the key represents some type of semantic identifier (e.g., a tuple of coordinates in space), and the value is a NumPy array of actual data. For the Images object, the key could be a time point, for example, and the value is the image at that time point formatted as a NumPy array. For the Series object, the key might be an n-dimensional tuple with the coordinates of the corresponding voxel, while the value is a one-dimensional NumPy array representing the time series of measurements at that voxel. All the arrays in Series must have the same dimensions.
We’ll use the K-means algorithm to cluster the various fish time series into multiple clusters in an attempt to describe the classes of neural behavior. We will use data already persisted as Series data packaged in the repo that is larger than the image data used previously.
A Spark application consists of a driver process, which in spark-shell ’s case, is the process that the user is interacting with, and a set of executor processes scattered across nodes on the cluster. The driver is in charge of the high-level control flow of work that needs to be done. The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache. Both the driver and the executors typically stick around for the entire time the application is running. A single executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime.
At the top of the execution model are jobs. Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it. To decide what this job looks like, Spark examines the graph of RDDs that the action depends on and formulates an execution plan that starts with computing the farthest back RDDs and culminates in the RDDs required to produce the action’s results. The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks that all execute the same code, each on a different partition of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.
What determines whether data needs to be shuffled? For the RDDs returned by so-called narrow transformations like map , the data required to compute a single partition resides in a single partition in the parent RDD. Each object is only dependent on a single object in the parent. However, Spark also supports transformations with wide dependencies like groupByKey and reduceByKey . In these, the data required to compute a single partition may reside in many partitions of the parent RDD. All of the tuples with the same key must end up in the same partition. To satisfy these operations, Spark must execute a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.
At each stage boundary, data is written to disk by tasks in the parent stage and then fetched over the network by tasks in the child stage. Thus, stage boundaries can be expensive and should be avoided when possible. The number of data partitions in the parent stage may be different than the number of partitions in the child stage. Transformations that may trigger a stage boundary typically accept a numPartitions argument that determines how many partitions to split the data into in the child stage. Just as the number of reducers is an important parameter in tuning MapReduce jobs, tuning the number of partitions at stage boundaries can often make or break an application’s performance. Choosing too few partitions can result in slowness when each task is forced to handle too much data. The amount of time it takes a task to complete often increases nonlinearly with the size of the data assigned to it, because aggregation operations must spill to disk when their data does not fit in memory. On the other side, a large number of partitions leads to increased overhead in tasks on the parent side when sorting records by their target partition, as well as more of the overhead associated with scheduling and launching each task on the child side.
As a distributed system, Spark often needs to serialize the raw Java objects it operates on. When data is cached in a serialized format, transferred over the network for a shuffle, Spark needs a byte stream representation of RDD contents. Spark accepts a pluggable Serializer for defining this serialization and deserialization. By default, Spark uses Java Object Serialization, which can serialize any Java object that implements the Serializable interface. Nearly always, Spark should be configured to instead use Kryo serialization. Kryo defines a more compact format that serializes and deserializes far faster. The “catch” is that, to get this efficiency, Kryo requires registering any custom classes defined in the application up front. Kryo will still work without registering the classes, but the serialization will take up more space and time because the class name must be written out before each record.
val conf = new SparkConf().setAppName("MyApp")
conf.registerKryoClasses(Array(classOf[MyCustomClass1], classOf[MyCustomClass2]))
Accumulators are a Spark construct that allow collecting some statistics “on the side” while a job is running. The code executing in each task can add to the accumulator, and the driver can access its value. Accumulators are useful in situations like counting the number of bad records a job encounters or computing the summed error during a stage of an optimization process.
A task only contributes to the accumulator the first time it runs. For example, if a task completes successfully, but its outputs are lost and it needs to be rerun, it will not increment the accumulator again.
Accumulators are an optimization in the sense that, instead, the RDD could be cached and a separate action run over it to calculate the same results. Accumulators allow this to be achieved much more efficiently by avoiding caching the data and avoiding executing another job.
A few of Spark’s transformations and actions are particularly useful when you’re exploring and trying to get a feel for a new data set. Some of these operators employ randomness. These operators use a seed to ensure determinism in the cases that task results are lost and need to be recomputed or multiple actions take advantage of the same uncached RDD.
take enables inexpensively looking at the first few elements of an RDD.
takeSample is useful for pulling a representative sample of the data into the driver for charting, playing with locally, or exporting for nondistributed analysis in a different environment like R.
top collects the k largest records in a data set according to a given Ordering . It is useful in a variety of situations, such as, after giving each record a score, examining the records with the highest scores.
Spark examples commonly employ textFile , but it is usually recommended to store large data sets in binary formats, both to take up less space and to enforce typing. Avro and Parquet files are the standard row and columnar formats respectively used to store data on Hadoop clusters. Avro also refers to an in-memory representation of on-disk data from both of these formats.
Spark Core refers to Spark’s distributed execution engine and the core Spark APIs. In addition to Spark Core, Spark contains a gaggle of subprojects that offer functionality on top of its engine.